package com.parkingwang.learning.rx1;

import rx.functions.Action1;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;

public class SubjectDemo {

    public static void main(String[] args) {
//        asyncSubject();


//        behaviorSubject();

//        publicSubject();

        replaySubject();

    }

    private static void replaySubject() {
        //创建之后就可以发送事件
        //作为观察者，能接收订阅前后的所有事件
        ReplaySubject<String> subject = ReplaySubject.create();
        subject.onNext("hello");
        subject.onNext("world");
        subject.onNext("java");

        subject.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext-----" + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError-----" + throwable);
            }
        });

        subject.onNext("A");
        subject.onNext("B");
        subject.onNext("C");
    }

    private static void publicSubject() {

        //创建之后就可以发送事件
        //作为观察者，只能接收订阅后的事件
        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("hello");
        subject.onNext("world");
        subject.onNext("java");

        subject.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext-----" + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError-----" + throwable);
            }
        });

        subject.onNext("A");
        subject.onNext("B");
        subject.onNext("C");
    }

    private static void behaviorSubject() {
        //BehaviorSubject 是以订阅方法作为分界线，值发送订阅前最后一个onNext()和订阅后的所有onNext()事件
        //如果订阅前没有发送数据，就会接收构造器中默认的事件
        BehaviorSubject<String> subject = BehaviorSubject.create("default");
        subject.onNext("hello");
        subject.onNext("world");
        subject.onNext("java");

        subject.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext-----" + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError-----" + throwable);
            }
        });

        subject.onNext("A");
        subject.onNext("B");
        subject.onNext("C");
    }

    private static void asyncSubject() {
        //Subject<T, R> extends Observable<R> implements Observer<T>
        //既是观察者，又是被观察者，一般做被观察者
        //AsyncSubject在创建之后就可以发送数据（不用订阅再发送数据）只接收最后一个onNext()时间（在onComplete()调用之前）
        //一定要调用onComplete()，数据才会被接收
        AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("hello");
        subject.onNext("world");
        subject.onNext("java");
        subject.onCompleted();
//        subject.onError(new NullPointerException());

        subject.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext-----" + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError-----" + throwable);
            }
        });
    }
}
